package com.atguigu.flink.sql.function;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by Smexy on 2023/11/20


    提供了系统内置函数: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/
 */
public class Demo1_FunctionUse
{
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/ws.json', " +
            "  'format' = 'json'             " +
            ")";

        tableEnv.executeSql(createTableSql);

        //TableAPI
        //基于表名获取表对象
        Table t1 = tableEnv.from("t1");
        Table table = t1.select(
            Expressions.$("vc"),
            Expressions.call("rand", Expressions.$("vc"))
        );


        //SQL
        //Table table = tableEnv.sqlQuery("select id,ts,vc,rand(vc) randVc from t1");

        table
                .execute()
                .print();

    }
}
